package window;

import bean.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.OutputTag;


/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2021/11/4 10:11
 */
public class Window1_TimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStream<String> inputStream = env.socketTextStream("192.168.88.106", 7777);

        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });


        /**
         * 时间窗口测试
         */

        /**
         * 增量聚合
         */
        DataStream<Integer> resultStream = dataStream.keyBy("id")
//                .countWindow(10,2) //滑动计数窗口
//                .window(EventTimeSessionWindows.withGap(Time.minutes(1))) //会话窗口
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(15))) //滚动窗口
                .timeWindow(Time.seconds(15)) //滚动窗口
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading sensorReading, Integer integer) {
                        return integer + 1;
                    }

                    @Override
                    public Integer getResult(Integer integer) {
                        return integer;
                    }

                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return integer + acc1;
                    }
                });


        /**
         * 全量聚合
         */
        DataStream<Tuple3<String, Long, Integer>> resultStream2 = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<SensorReading> iterable, Collector<Tuple3<String, Long, Integer>> collector) throws Exception {
                        String id = tuple.getField(0);
                        Long windowEnd = timeWindow.getEnd();
                        Integer count = IteratorUtils.toList(iterable.iterator()).size();
                        collector.collect(new Tuple3<>(id, windowEnd, count));
                    }
                });

        /**
         * 其他可选API
         */
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
        };

        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
//                .trigger() //触发器
//                .evictor() //移除器
                .allowedLateness(Time.minutes(1))  //允许处理迟到的数据
                .sideOutputLateData(outputTag)   //将迟到的数据放入侧输出流
                .sum("temperature");


        sumStream.getSideOutput(outputTag).print();   //获取侧输出流

//        resultStream2.print();

        env.execute();

    }
}
